博客中代码地址:https://github.com/farliu/farpc.git
本文实现的是远程调用,也就是图片中的第4步,dubbo作为一款RPC框架,这是它的核心功能,dubbo提供了很多种方式,如下图:
原理分析 首先科普一下RPC三个字母,即Remote Procedure Call。简单来说就是从一台机器(客户端)上通过参数传递的方式调用另一台机器(服务器)上的一个函数或方法(可以统称为服务)并得到返回的结果。
回想一下,java本地方法调用。假如在电脑ComputerA上有一个类ClassA,其中有methodA()方法,我们调用的话,就是new一个ClassA的对象classA,然后classA.methodA()来调用。这其中值得深思,凭什么我们能new一个ClassA的对象?我们能不能new一个ComputerB机器上的类ClassB的对象呢?
第二个答案肯定都知道,肯定是不行。第一个问题,我们为什么能在ComputerA上new一个ClassA的对象,是因为在ComputerA上存在ClassA类的class文件,通过JVM加载后,我们可以实现对它的调用。所以为了解决第二个问题,前辈们还真想出了办法。你不是只要class文件就行了嘛,那就给你,ComputerA打个jar包给ComputerB去加载。也就达到了调用的效果。
以上其实就是解决两个应用之间交互的早期办法。但是当系统依赖复杂后,这种方式极为不妥,每一个系统都得加载子系统的所有class,非常不合适。以及还有敏感代码允许泄露等问题。
为了解决这个问题,我们也就出现RPC服务,就是当ComputerA要调用ComputerB的方法时,ComputerA通过某种方式告诉ComputerB,再由ComputerB执行完之后,将结果告诉ComputerA。这就是RPC最初的设想。我们归纳一下几个步骤
ComputerA将自己的需要调用的方法和参数准备封装好。
按照约定的方式,将封装好的参数传给ComputerB
ComputerB收到约定的数据后,解析获得ComputerA需要调用的方法和参数。
ComputerB按照ComputerA给的数据,执行对应的方法。
ComputerB将执行结果按照约定返回ComputerA。
可以看到这个过程,极为重要的就是数据传输,为了实现数据传输,我们搞出了很多花样。比较通常的就是,将参数转成xml,通过Http传送给另一台机器,后来发现xml体积太大,我们又将json代替了xml。再后来我们又觉得每次用http协议,都得重新连接,又使用socket实现长连接。再后来觉得socket实现阻塞IO,效率不高,又推送了NIO,以及selector、channel这些专业术语。这都是在优化传输过程,本章采用netty来实现传输。
对于ComputerB执行相应的方法,基于以上的约定,ComputerB拿到所需的参数后,使用java反射就能调用具体的方法了。
项目结构介绍 本节涉及博客中代码的module,farpc-rpc(远程调用)、farpc-demo。
初始化netty 本章使用netty实现rpc,自然要导入jar包。
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.36.Final</version > </dependency >
秉承可扩展设计,提供两个接口。
1 2 3 4 5 6 7 8 9 @FarSPI ("netty" )public interface IProviderServer { void start(String selfAddress); } @FarSPI ("netty" )public interface IConsumerServer { Object execute(String address, RequestDTO requestDTO); }
服务端 服务端就是常规的netty代码,启动服务,然后配置Handler,处理接收的信息。值得一提的是,在服务端启动的时候,我会去扫描所有标注了Provider注解的类,然后将其注入到注册中心,并为一个container保存对应的对象,用于反射执行指定方法。
Provider模拟dubbo中的@Service注解,用于注册服务。
1 2 3 4 5 6 7 @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface Provider { Class interfaceClazz(); String name() default "" ; }
Container扫描被Provider修饰的类,然后反射生成对象,将其保存到本地容器供反射执行指定的方法、保存到注册中心供消费端发现服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Container { private static final Logger logger = LoggerFactory.getLogger(Container.class); private static IRegistrar registrar = RegistrarFactory.getRegistrar(); private static Map<String , Object> providers = new HashMap <String , Object>(); static { Reflections reflections = new Reflections (new ConfigurationBuilder () .setUrls(ClasspathHelper.forPackage("com.ofcoder" )) .setScanners(new TypeAnnotationsScanner ())); Set<Class<?>> classes = reflections.getTypesAnnotatedWith(Provider.class, true ); for (Class<?> clazz : classes ) { try { Provider annotation = clazz.getAnnotation(Provider.class); Object provider = clazz.new Instance (); String canonicalName = annotation.interfaceClazz().getCanonicalName(); providers.put(canonicalName, provider); } catch (Exception e) { logger.error(e.getMessage(), e); } } } public static void registerSelf(String selfAddress){ for (String service : providers .keySet()) { registrar.register(selfAddress, service); } } public static Map<String , Object> getProviders() { return providers; } }
NettyProviderHandler用来处理收到的信息,然后根据收到的数据,从本地容器中取得对象,调用指定的方法,并将执行结果返回给消费端。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class NettyProviderHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory . getLogger(NettyProviderHandler.class ) ; @Override public void channelRead(ChannelHandlerContext ctx , Object msg ) throws Exception { super.channelRead(ctx , msg ) ; RequestDTO requestDTO = (RequestDTO) msg; Object result = new Object() ; logger.info("receive request.. {}" , requestDTO); if (Container . getProviders() .containsKey(requestDTO .getClassName () )) { Object provider = Container . getProviders() .get(requestDTO.getClassName() ); Class<?> providerClazz = provider.getClass() ; Method method = providerClazz.getMethod(requestDTO .getMethodName () , requestDTO.getTypes() ); result = method .invoke(provider, requestDTO.getParams() ); } ctx.write(result); ctx.flush() ; ctx.close() ; } }
NettyProviderServer就是监听指定的端口,启动服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class NettyProviderServer implements IProviderServer { private static final Logger logger = LoggerFactory . getLogger(NettyProviderServer.class ) ; public void start(String selfAddress) { Container . registerSelf(selfAddress ) ; String[] addrs = selfAddress.split(":" ); String ip = addrs[0 ] ; Integer port = Integer . parseInt(addrs [1]) ; publisher(ip, port); } private void publisher(String ip, Integer port) { try { EventLoopGroup bossGroup = new NioEventLoopGroup() ; EventLoopGroup workerGroup = new NioEventLoopGroup() ; ServerBootstrap bootstrap = new ServerBootstrap() ; bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel .class ) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel ) throws Exception { ChannelPipeline pipeline = channel.pipeline() ; pipeline.addLast(new ObjectEncoder() ); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled (NettyProviderServer.class .getClassLoader () ))); pipeline.addLast(new NettyProviderHandler() ); } }).option(ChannelOption.SO_BACKLOG, 128 ).childOption(ChannelOption.SO_KEEPALIVE, true ) ; ChannelFuture future = bootstrap.bind(ip, port).sync() ; logger.info("netty server is started..." ); future.channel() .closeFuture() .sync() ; } catch (Exception e) { logger.error(e.getMessage() , e); } } }
消费端 NettyConsumerServer用于发送请求,将封装好的参数发给服务提供者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class NettyConsumerServer implements IConsumerServer { private static final Logger logger = LoggerFactory . getLogger(NettyConsumerServer.class ) ; public Object execute(String serivceAddress, RequestDTO requestDTO) { String[] addrs = serivceAddress.split(":" ); String host = addrs[0 ] ; Integer port = Integer . parseInt(addrs [1]) ; final NettyConsumerHandler consumerHandler = new NettyConsumerHandler() ; EventLoopGroup group = new NioEventLoopGroup() ; try { Bootstrap bootstrap = new Bootstrap() ; bootstrap.group(group) .channel(NioSocketChannel .class ) .option(ChannelOption.TCP_NODELAY, true ) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel ) throws Exception { ChannelPipeline pipeline = channel.pipeline() ; pipeline.addLast( new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled (ConsumerProxy.class .getClassLoader () ))); pipeline.addLast( new ObjectEncoder() ); pipeline.addLast(consumerHandler ) ; } }); ChannelFuture future = bootstrap.connect(host, port).sync() ; Channel channel = future.channel() ; channel.writeAndFlush(requestDTO ) ; logger.info("send request..., {}" , requestDTO); channel.closeFuture() .sync() ; } catch (Exception e) { logger.error(e.getMessage() , e); } finally { group.shutdownGracefully() ; } return consumerHandler.getResponse() ; } }
NettyConsumerHandler用于处理提供端返回的结果,这里没有做过多处理,直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 public class NettyConsumerHandler extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response = msg; } }
使用SPI整合 在上一章,已经把SPI集成的很不错了,这里我们可以按照上一章套路管理RPC服务。我们提供一个Factory,用来替代自适应扩展。
1 2 3 4 5 6 7 8 9 10 11 12 13 public class RpcFactory { public static IConsumerServer getConsumerService() { String protocol = Property .Rpc . protocol; IConsumerServer extension = ExtensionLoader . getExtensionLoader(IConsumerServer.class ) .getExtension(protocol ) ; return extension; } public static IProviderServer getProviderServer() { String protocol = Property .Rpc . protocol; IProviderServer extension = ExtensionLoader . getExtensionLoader(IProviderServer.class ) .getExtension(protocol ) ; return extension; } }
配置文件
1 2 3 4 5 6 7 com.ofcoder .farpc .rpc .IConsumerServer netty=com.ofcoder .farpc .rpc .netty .NettyConsumerServer ------------------------------------------------------ com.ofcoder .farpc .rpc .IProviderServer netty=com.ofcoder .farpc .rpc .netty .NettyProviderServer
测试 1 2 3 4 5 6 7 8 9 10 11 12 @Test public void provider() throws IOException { IProviderServer providerServer = RpcFactory.getProviderServer(); providerServer.start ("127.0.0.1:20880"); System .in .read (); } @Test public void consumer() { IConsumerServer consumerService = RpcFactory.getConsumerService(); Object execute = consumerService.execute ("127.0.0.1:20880", new RequestDTO()); }
先启动提供者,然后在执行消费。可以在提供者的控制台看到,相应的日志,也就说明达到我们需要的效果了。如下
1 2 3 .. .main INFO netty.NettyProviderServer: netty server is started.. . nioEventLoopGroup-3-1 INFO netty.NettyProviderHandler: receive request.. RequestDTO{className ='null' , methodName ='null' , types =null , params =null }
dubbo源码 这一节,来印证dubbo实现过程,也算是当作源码导读。这一节也分为两段分析,一消费端调用过程,二服务端收到请求处理过程。
整个过程可以总结为:首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。
消费端 消费端调用复杂在于链路太长,由Proxy调用开始,会经过一系列的Invoker,直到DubboInvoker再去真正发起请求
1 2 3 4 5 6 Proxy . greet -> InvokerInvocationHandler . invoke -> MockClusterInvoker . invoke -> ... -> AbstractInvoker . invoke -> DubboInvoker . doInvoke
InvokerInvocationHandler用于排除调用toString()、equals()、hashCode()这些方法。MockClusterInvoker主要实现了降级逻辑,在服务调用失败后用于返回默认值。以及后续还有FailoverClusterInvoker,最后会调到DubboInvoker.doInvoke,着重关心这一块,中间那些增强的Invoker逻辑,可以自己了解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 protected Result do Invoke(final Invocation invocation ) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils . getMethodName(invocation ) ; inv.setAttachment(PATH_KEY, getUrl () .getPath() ); inv.setAttachment(VERSION_KEY, version ) ; ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ] ; } else { currentClient = clients[index .getAndIncrement () % clients .length ] ; } try { boolean isOneway = RpcUtils . isOneway(getUrl () , invocation); int timeout = getUrl() .getMethodParameter(methodName , TIMEOUT_KEY, DEFAULT_TIMEOUT) ; if (isOneway) { boolean isSent = getUrl() .getMethodParameter(methodName , Constants.SENT_KEY, false ) ; currentClient.send(inv, isSent); RpcContext . getContext() .setFuture(null ) ; return AsyncRpcResult .new DefaultAsyncResult(invocation ) ; } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv ) ; CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); asyncRpcResult.subscribeTo(responseFuture ) ; RpcContext . getContext() .setFuture(new FutureAdapter(asyncRpcResult ) ); return asyncRpcResult; } } catch (TimeoutException e) { ... } catch (RemotingException e) { ... } }
消费端到此处,应分为第二段,既发送请求。这里dubbo会调用一系列Client,这些Client均是NettyClient的包装增强,对request()的调用都是一直往下传递。
1 2 3 4 5 6 ReferenceCountExchangeClient . request -> HeaderExchangeClient . request -> HeaderExchangeChannel . request -> AbstractClient . send -> NettyChannel . send -> NioClientSocketChannel . write
ReferenceCountExchangeClient为对象引用增加计数器,当close()调用时,该计数器减1。HeaderExchangeClient增加心跳检测,这里请求会转到Channel对象,并结束request()传递调用。HeaderExchangeChannel对Request的封装,并通过AbstractClient.getChannel()获取到NettyChannel对象并调用其send()方法。完成整个请求的发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void send (Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.write (message); if (sent) { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future .await(timeout); } Throwable cause = future .getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this , "Failed to send message ..." ); } if (!success) { throw new RemotingException(this , "Failed to send message ..." ); } }
提供端 在服务提供端获取到请求之后,然后交由NettyHandler.messageReceived处理,该方法会进行和消费端相关操作,以此会执行MultiMessageHandlerH、eartbeatHandler,最后由ChannelHandler将操作逻辑封装到Runable对象中,交给线程池进行调用处理,这个过程也成为线程派发,dubbo提供5中派发模式。
策略
用途
all
所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行
message
只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行
execution
只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行
connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池
dubbo默认选择策略是all,整理上述调用链路如下
1 2 3 4 5 6 NettyHandler#messageReceived -> AbstractPeer#received —> MultiMessageHandler#received —> HeartbeatHandler#received —> ALLChannelHandler#received —> ExecutorService#execute
ALLChannelHandler.received会初始化ChannelEventRunnable对象,由该对象真正完成调用,我们看看具体源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ChannelEventRunnable implements Runnable { @Override public void run() { // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state ) { case CONNECTED: ... break; case DISCONNECTED: ... break; case SENT: ... break; case CAUGHT: ... break; default : logger.warn("unknown state: " + state + ", message is " + message); } } } }
这里多说一句,先用if判断出现频率比较高的消息类型,然后用switch处理其他类型,不用把频率较高的类型和普通类型同级判断,以此提高效率。我们开发过程中也可借鉴这一点。 ChannelEventRunnable作用类似于路由,将消息分别交给各自的ChannelHandler去处理,这里的对象为DecodeHandler,该Handler就是对Request或Response进行解码后,继续传递到HeaderExchangeHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class HeaderExchangeHandler implements ChannelHandlerDelegate { @Override public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis () ); final ExchangeChannel exchangeChannel = HeaderExchangeChannel . getOrAddChannel(channel ) ; try { if (message instanceof Request) { Request request = (Request) message; if (request.isEvent() ) { handlerEvent(channel , request ) ; } else { if (request.isTwoWay() ) { handleRequest(exchangeChannel , request ) ; } else { handler.received(exchangeChannel, request.getData() ); } } } else if (message instanceof Response) { handleResponse(channel , (Response) message); } else if (message instanceof String) { ... } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel . removeChannelIfDisconnected(channel ) ; } } Response handleRequest(ExchangeChannel channel , Request req ) throws RemotingException { Response res = new Response(req .getId () , req.getVersion() ); if (req.isBroken() ) { Object data = req.getData() ; String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils .to String((Throwable) data); else msg = data.to String() ; res.setErrorMessage("Fail to decode request due to: " + msg ) ; res.setStatus(Response.BAD_REQUEST) ; return res; } Object msg = req.getData() ; try { Object result = handler.reply(channel, msg); res.setStatus(Response.OK) ; res.setResult(result ) ; } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR) ; res.setErrorMessage(StringUtils.toString (e ) ); } return res; } }
接下来要说的就是定义在DubboProtocol类中匿名对象的reply方法,既ExchangeHandlerAdapter.reply()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel , inv ) ; if (Boolean .TRUE .to String() .equals(inv.getAttachments() .get(IS_CALLBACK_SERVICE_INVOKE))) { } RpcContext . getContext() .setRemoteAddress(channel .getRemoteAddress () ); return invoker.invoke(inv); } throw new RemotingException(channel , "Unsupported request: ..." ) ; } ... }
先通过getInvoker()获取Invoker实例,然后调用invoke方法。getInvoker()方法先从缓存中获取,没命中则调用DubboExporter.getInvoker()继续创建。而Invoker的invoke方法是由AbstractProxyInvoker实现,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public abstract class AbstractProxyInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation invocation) throws RpcException { try { Object value = do Invoke(proxy , invocation .getMethodName () , invocation.getParameterTypes() , invocation.getArguments() ); ... return asyncRpcResult; } catch (InvocationTargetException e) { ... return AsyncRpcResult .new DefaultAsyncResult(null , e .getTargetException () , invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation .getMethodName () + " to " + getUrl() + ", cause: " + e.getMessage() , e); } } }
剩余的最后一个doInvoke,是一个抽象方法,由子类实现,而Invoker实现类是由JavassistProxyFactory 动态生成,具体可查看JavassistProxyFactory.getInvoker()方法。最后生成的代理类逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Wrapper0 extends Wrapper implements ClassGenerator .DC { public static String [] pns; public static Map pts; public static String [] mns; public static String [] dmns; public static Class[] mts0; public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException (throwable); } try { if ("sayHello" .equals(string) && arrclass.length == 1 ) { return demoService.sayHello((String )arrobject[0 ]); } } catch (Throwable throwable) { throw new InvocationTargetException (throwable); } throw new NoSuchMethodException (new StringBuffer ().append("Not found method \"" ).append(string).append("\" in class com.alibaba.dubbo.demo.DemoService." ).toString()); } }
这里可以看到,最后不是通过反射去执行的,而是根据具体方法名路由的,然后调用执行的。所以以后谁要说反射执行,就拿这篇文章呼他的脸。不过话说回来,我们实现的调用还是通过反射,我们也看到了dubbo的实现太复杂了,如果再造一个轮子没必要,主要是弄清楚原理。